Interactive Beam Examples


In [1]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner

In [2]:
p = beam.Pipeline(interactive_runner.InteractiveRunner())
init_pcoll = p |  beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
result = p.run()
result.wait_until_finish()


G Cell 2: Create Cell 2: Create init_pcoll init_pcoll Cell 2: Create->init_pcoll Cell 2: Square Cell 2: Square init_pcoll->Cell 2: Square Cell 2: Cube Cell 2: Cube init_pcoll->Cell 2: Cube squares squares Cell 2: Square->squares cubes cubes Cell 2: Cube->cubes

In [3]:
init_list = list(range(10))
squares_list = list(result.get(squares))
cubes_list = list(result.get(cubes))

squares_list.sort()
cubes_list.sort()

!pip install matplotlib

%matplotlib inline
from matplotlib import pyplot as plt
plt.scatter(init_list, squares_list, label='squares', color='red')
plt.scatter(init_list, cubes_list, label='cubes', color='blue')
plt.legend(loc='upper left')
plt.show()


Requirement already satisfied: matplotlib in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (3.1.1)
Requirement already satisfied: cycler>=0.10 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (0.10.0)
Requirement already satisfied: kiwisolver>=1.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.1.0)
Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.4.2)
Requirement already satisfied: python-dateutil>=2.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.8.0)
Requirement already satisfied: numpy>=1.11 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.17.3)
Requirement already satisfied: six in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from cycler>=0.10->matplotlib) (1.12.0)
Requirement already satisfied: setuptools in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from kiwisolver>=1.0.1->matplotlib) (41.6.0)

In [4]:
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

In [5]:
average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())
average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())

In [6]:
result = p.run()


G Cell 2: Create Cell 2: Create init_pcoll init_pcoll Cell 2: Create->init_pcoll Cell 2: Square Cell 2: Square init_pcoll->Cell 2: Square Cell 2: Cube Cell 2: Cube init_pcoll->Cell 2: Cube squares squares Cell 2: Square->squares Cell 5: Average Square Cell 5: Average Square squares->Cell 5: Average Square cubes cubes Cell 2: Cube->cubes Cell 5: Average Cube Cell 5: Average Cube cubes->Cell 5: Average Cube average_square average_square Cell 5: Average Square->average_square average_cube average_cube Cell 5: Average Cube->average_cube